{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Measuring data lake stats\n",
    "This notebook is used to measure some aggregated statistics on each data lake. Every table in each data lake is read and \n",
    "some metrics are evaluated, then the result is saved on disk. \n",
    "\n",
    "Some of the statistics measured here are reported in table  of the paper. "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "/home/soda/rcappuzz/work/benchmark-join-suggestions\n"
     ]
    }
   ],
   "source": [
    "cd ~/bench"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "import polars as pl\n",
    "from pathlib import Path\n",
    "import polars.selectors as cs\n",
    "from tqdm.notebook import tqdm\n",
    "from joblib import Parallel, delayed"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def table_profile(table_path, data_lake):\n",
    "    df = pl.read_parquet(table_path)\n",
    "    n_num = df.select(cs.numeric()).shape[1]\n",
    "    c_num = df.select(~cs.numeric()).shape[1]\n",
    "    if len(df) > 0:\n",
    "        avg_null = df.null_count().mean_horizontal().item() / len(df)\n",
    "    else:\n",
    "        avg_null = 0\n",
    "    d = {\n",
    "        \"data_lake\": data_lake,\n",
    "        \"table\": table_path.stem,\n",
    "        \"num_attr\": n_num,\n",
    "        \"cat_attr\": c_num,\n",
    "        \"n_rows\": len(df),\n",
    "        \"n_cols\": len(df.columns),\n",
    "        \"avg_null\": avg_null,\n",
    "    }\n",
    "\n",
    "    return d\n",
    "\n",
    "\n",
    "def get_stats(df: pl.DataFrame):\n",
    "    return df.select(\n",
    "        pl.col(\"data_lake\").first(),\n",
    "        pl.col(\"n_tables\").first(),\n",
    "        pl.col(\"n_rows\").sum().alias(\"tot_rows\"),\n",
    "        pl.col(\"n_cols\").sum().alias(\"tot_cols\"),\n",
    "        pl.col(\"n_cols\").mean().alias(\"mean_n_cols\"),\n",
    "        pl.col(\"n_cols\").median().alias(\"median_n_cols\"),\n",
    "        pl.col(\"n_rows\").mean().alias(\"mean_n_rows\"),\n",
    "        pl.col(\"n_rows\").median().alias(\"median_n_rows\"),\n",
    "        pl.col(\"num_attr\").mean().alias(\"mean_num_attr\"),\n",
    "        pl.col(\"num_attr\").median().alias(\"median_num_attr\"),\n",
    "        pl.col(\"cat_attr\").mean().alias(\"mean_cat_attr\"),\n",
    "        pl.col(\"cat_attr\").median().alias(\"median_cat_attr\"),\n",
    "        pl.col(\"avg_null\").mean().alias(\"mean_avg_null\"),\n",
    "        pl.col(\"avg_null\").median().alias(\"median_avg_null\"),\n",
    "    )"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "path_list = list(\n",
    "    map(\n",
    "        Path,\n",
    "        [\n",
    "            \"data/yadl/binary_update/\",\n",
    "            \"data/yadl/wordnet_full/\",\n",
    "            \"data/yadl/wordnet_vldb_10/\",\n",
    "            \"data/yadl/wordnet_vldb_50/\",\n",
    "            \"data/open_data_us/\",\n",
    "        ],\n",
    "    )\n",
    ")\n",
    "stats = []\n",
    "for path in path_list:\n",
    "    profiles = []\n",
    "    profiles = Parallel(n_jobs=8, verbose=0)(\n",
    "        delayed(table_profile)(tab, path.stem)\n",
    "        for tab in tqdm(\n",
    "            path.glob(\"**/*.parquet\"),\n",
    "            total=sum(1 for _ in path.glob(\"**/*.parquet\")),\n",
    "            position=0,\n",
    "            leave=False,\n",
    "            desc=path.stem,\n",
    "        )\n",
    "    )\n",
    "    df = pl.from_dicts(profiles).with_columns(pl.lit(len(profiles)).alias(\"n_tables\"))\n",
    "    stats.append(get_stats(df))\n",
    "df_stats = pl.concat(stats)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "\n",
      "KeyboardInterrupt\n",
      "\n"
     ]
    }
   ],
   "source": [
    "display(\n",
    "    df_stats.transpose(include_header=True, column_names=\"data_lake\")\n",
    "    .to_pandas()\n",
    "    .style.format(precision=2)\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 19,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Save stats on disk.\n",
    "df_stats.transpose(include_header=True, column_names=\"data_lake\").write_csv(\n",
    "    \"stats_data_lakes.csv\"\n",
    ")"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.10.0"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
